47f01c8e7ca269f8b9aa0b59f0a100324f01d7c7,advanced/main/java/org/neo4j/kernel/ha/MasterClient.java,MasterClient,sendRequest,#RequestType#SlaveContext#Serializer#Deserializer#,127
Before Change
@SuppressWarnings( "unchecked" )
BlockingReadHandler<ChannelBuffer> reader = (BlockingReadHandler<ChannelBuffer>)
channel.getPipeline().get( "blockingHandler" );
Pair<ChannelBuffer, Boolean> messageContext = readNextMessage( channelContext, reader );
ChannelBuffer message = messageContext.first();
T response = deserializer.read( message );
String[] datasources = type.includesSlaveContext() ? readTransactionStreamHeader( message ) : null;
if ( messageContext.other() )
{
// This message consists of multiple chunks, apply transactions as early as possible
message = createDynamicBufferFrom( message );
boolean more = true;
while ( more )
{
Pair<ChannelBuffer, Boolean> followingMessage = readNextMessage( channelContext, reader );
more = followingMessage.other();
message.writeBytes( followingMessage.first() );
message = applyFullyAvailableTransactions( datasources, message );
}
}
// Here's the remaining transactions if the message consisted of multiple chunks,
// or all transactions if it only consisted of one chunk.
TransactionStream txStreams = type.includesSlaveContext() ?
readTransactionStreams( datasources, message ) : TransactionStream.EMPTY;
return new Response<T>( response, txStreams );
}
catch ( ClosedChannelException e )
After Change
try
{
// Send 'em over the wire
channelContext = getChannel();
Channel channel = channelContext.first();
ChannelBuffer buffer = channelContext.second();
buffer.clear();
buffer = new ChunkingChannelBuffer( buffer, channel, MAX_FRAME_LENGTH );
buffer.writeByte( type.ordinal() );
if ( type.includesSlaveContext() )
{
writeSlaveContext( buffer, slaveContext );
}
serializer.write( buffer, channelContext.third() );
if ( buffer.writerIndex() > 0 )
{
channel.write( buffer );
}
// Read the response
@SuppressWarnings( "unchecked" )
BlockingReadHandler<ChannelBuffer> reader = (BlockingReadHandler<ChannelBuffer>)
channel.getPipeline().get( "blockingHandler" );
final Triplet<Channel, ChannelBuffer, ByteBuffer> finalChannelContext = channelContext;
DechunkingChannelBuffer dechunkingBuffer = new DechunkingChannelBuffer( ChannelBuffers.dynamicBuffer(), reader )
{
@Override
protected ChannelBuffer readNext()
{
ChannelBuffer result = super.readNext();
if ( result == null )
{
channelPool.dispose( finalChannelContext );
throw new HaCommunicationException( "Channel has been closed" );
}
return result;
}
};
T response = deserializer.read( dechunkingBuffer );
String[] datasources = type.includesSlaveContext() ? readTransactionStreamHeader( dechunkingBuffer ) : null;
while ( dechunkingBuffer.expectsMoreChunks() )
{
applyFullyAvailableTransactions( datasources, dechunkingBuffer );
if ( dechunkingBuffer.expectsMoreChunks() )
{
dechunkingBuffer.forceReadNextChunk();
}
}
// Here's the remaining transactions if the message consisted of multiple chunks,
// or all transactions if it only consisted of one chunk.
TransactionStream txStreams = type.includesSlaveContext() ?
readTransactionStreams( datasources, dechunkingBuffer ) : TransactionStream.EMPTY;
return new Response<T>( response, txStreams );
}
catch ( ClosedChannelException e )